#pragma once

#include <thread>
#include <boost/noncopyable.hpp>
#include <DB/Core/Types.h>
#include <DB/Common/ConcurrentBoundedQueue.h>
#include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Context.h>
#include <DB/Common/Stopwatch.h>
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/parseQuery.h>
#include <DB/Parsers/ExpressionElementParsers.h>
#include <DB/Parsers/ASTRenameQuery.h>
#include <DB/Parsers/formatAST.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <DB/Interpreters/InterpreterCreateQuery.h>
#include <DB/Interpreters/InterpreterRenameQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Common/setThreadName.h>


namespace DB
{


/** Allow to store structured log in system table.
  *
  * Logging is asynchronous. Data is put into queue from where it will be read by separate thread.
  * That thread inserts log into a table with no more than specified periodicity.
  */

/** Structure of log, template parameter.
  * Structure could change on server version update.
  * If on first write, existing table has different structure,
  *  then it get renamed (put aside) and new table is created.
  */
/* Example:
	struct LogElement
	{
		/// default constructor must be available
		/// fields

		static std::string name();
		static Block createBlock();
		void appendToBlock(Block & block) const;
	};
	*/


#define DBMS_SYSTEM_LOG_QUEUE_SIZE 1024

class Context;


template <typename LogElement>
class SystemLog : private boost::noncopyable
{
public:

	/** Parameter: table name where to write log.
	  * If table is not exists, then it get created with specified engine.
	  * If it already exists, then its structure is checked to be compatible with structure of log record.
	  *  If it is compatible, then existing table will be used.
	  *  If not - then existing table will be renamed to same name but with suffix '_N' at end,
	  *   where N - is a minimal number from 1, for that table with corresponding name doesn't exist yet;
	  *   and new table get created - as if previous table was not exist.
	  */
	SystemLog(
		Context & context_,
		const String & database_name_,
		const String & table_name_,
		const String & engine_,
		size_t flush_interval_milliseconds_);

	~SystemLog();

	/** Append a record into log.
	  * Writing to table will be done asynchronously and in case of failure, record could be lost.
	  */
	void add(const LogElement & element)
	{
		/// We could lock here in case of queue overflow. Maybe better to throw an exception or even don't do logging in that case.
		queue.push({false, element});
	}

private:
	Context & context;
	const String database_name;
	const String table_name;
	StoragePtr table;
	const String engine;
	const size_t flush_interval_milliseconds;

	using QueueItem = std::pair<bool, LogElement>;		/// First element is shutdown flag for thread.

	/// Queue is bounded. But its size is quite large to not block in all normal cases.
	ConcurrentBoundedQueue<QueueItem> queue {DBMS_SYSTEM_LOG_QUEUE_SIZE};

	/** Data that was pulled from queue. Data is accumulated here before enough time passed.
	  * It's possible to implement double-buffering, but we assume that insertion into table is faster
	  *  than accumulation of large amount of log records (for example, for query log - processing of large amount of queries).
	  */
	std::vector<LogElement> data;

	Logger * log;

	/** In this thread, data is pulled from 'queue' and stored in 'data', and then written into table.
	  */
	std::thread saving_thread;

	void threadFunction();
	void flush();

	/** Creates new table if it does not exist.
	  * Renames old table if its structure is not suitable.
	  * This cannot be done in constructor to avoid deadlock while renaming a table under locked Context when SystemLog object is created.
	  */
	bool is_prepared = false;
	void prepareTable();
};


template <typename LogElement>
SystemLog<LogElement>::SystemLog(Context & context_,
	const String & database_name_,
	const String & table_name_,
	const String & engine_,
	size_t flush_interval_milliseconds_)
	: context(context_),
	database_name(database_name_), table_name(table_name_), engine(engine_),
	flush_interval_milliseconds(flush_interval_milliseconds_)
{
	log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");

	data.reserve(DBMS_SYSTEM_LOG_QUEUE_SIZE);
	saving_thread = std::thread([this] { threadFunction(); });
}


template <typename LogElement>
SystemLog<LogElement>::~SystemLog()
{
	/// Tell thread to shutdown.
	queue.push({true, {}});
	saving_thread.join();
}


template <typename LogElement>
void SystemLog<LogElement>::threadFunction()
{
	setThreadName("SystemLogFlush");

	Stopwatch time_after_last_write;
	bool first = true;

	while (true)
	{
		try
		{
			if (first)
			{
				time_after_last_write.restart();
				first = false;
			}

			QueueItem element;
			bool has_element = false;

			if (data.empty())
			{
				queue.pop(element);
				has_element = true;
			}
			else
			{
				size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
				if (milliseconds_elapsed < flush_interval_milliseconds)
					has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed);
			}

			if (has_element)
			{
				if (element.first)
				{
					/// Shutdown.
					flush();
					break;
				}
				else
					data.push_back(element.second);
			}

			size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000;
			if (milliseconds_elapsed >= flush_interval_milliseconds)
			{
				/// Write data to a table.
				flush();
				time_after_last_write.restart();
			}
		}
		catch (...)
		{
			/// In case of exception we lost accumulated data - to avoid locking.
			data.clear();
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
	}
}


template <typename LogElement>
void SystemLog<LogElement>::flush()
{
	try
	{
		LOG_TRACE(log, "Flushing query log");

		if (!is_prepared)	/// BTW, flush method is called from single thread.
			prepareTable();

		Block block = LogElement::createBlock();
		for (const LogElement & elem : data)
			elem.appendToBlock(block);

		/// We write to table indirectly, using InterpreterInsertQuery.
		/// This is needed to support DEFAULT-columns in table.

		std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
		insert->database = database_name;
		insert->table = table_name;
		ASTPtr query_ptr(insert.release());

		InterpreterInsertQuery interpreter(query_ptr, context);
		BlockIO io = interpreter.execute();

		io.out->writePrefix();
		io.out->write(block);
		io.out->writeSuffix();
	}
	catch (...)
	{
		tryLogCurrentException(__PRETTY_FUNCTION__);
	}

	/// In case of exception, also clean accumulated data - to avoid locking.
	data.clear();
}


template <typename LogElement>
void SystemLog<LogElement>::prepareTable()
{
	String description = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(table_name);

	table = context.tryGetTable(database_name, table_name);

	if (table)
	{
		const Block expected = LogElement::createBlock();
		const Block actual = table->getSampleBlockNonMaterialized();

		if (!blocksHaveEqualStructure(actual, expected))
		{
			/// Переименовываем существующую таблицу.
			int suffix = 0;
			while (context.isTableExist(database_name, table_name + "_" + toString(suffix)))
				++suffix;

			auto rename = std::make_shared<ASTRenameQuery>();

			ASTRenameQuery::Table from;
			from.database = database_name;
			from.table = table_name;

			ASTRenameQuery::Table to;
			to.database = database_name;
			to.table = table_name + "_" + toString(suffix);

			ASTRenameQuery::Element elem;
			elem.from = from;
			elem.to = to;

			rename->elements.emplace_back(elem);

			LOG_DEBUG(log, "Existing table " << description << " for system log has obsolete or different structure."
			" Renaming it to " << backQuoteIfNeed(to.table));

			InterpreterRenameQuery(rename, context).execute();

			/// Нужная таблица будет создана.
			table = nullptr;
		}
		else
			LOG_DEBUG(log, "Will use existing table " << description << " for " + LogElement::name());
	}

	if (!table)
	{
		/// Создаём таблицу.
		LOG_DEBUG(log, "Creating new table " << description << " for " + LogElement::name());

		auto create = std::make_shared<ASTCreateQuery>();

		create->database = database_name;
		create->table = table_name;

		Block sample = LogElement::createBlock();
		create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList());

		ParserFunction engine_parser;

		create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for" + LogElement::name());

		InterpreterCreateQuery(create, context).execute();

		table = context.getTable(database_name, table_name);
	}

	is_prepared = true;
}


}
